/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.kafka;



import com.bff.gaia.unified.sdk.coders.AvroCoder;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.io.UnboundedSource;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Joiner;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.TopicPartition;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.util.ArrayList;

import java.util.Collections;

import java.util.Comparator;

import java.util.List;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkState;



/**

 * An {@link UnboundedSource} to read from Kafka, used by {@link KafkaIO.Read} transform in KafkaIO. See

 * {@link KafkaIO} for user visible documentation and example usage.

 */

class KafkaUnboundedSource<K, V> extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {



  /**

   * The partitions are evenly distributed among the splits. The number of splits returned is {@code

   * min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact count.

   *

   * <p>It is important to assign the partitions deterministically so that we can support resuming a

   * split from last checkpoint. The Kafka partitions are sorted by {@code <topic, partition>} and

   * then assigned to splits in round-robin order.

   */

  @Override

  public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOptions options)

      throws Exception {



    List<TopicPartition> partitions = new ArrayList<>(spec.getTopicPartitions());



    // (a) fetch partitions for each topic

    // (b) sort by <topic, partition>

    // (c) round-robin assign the partitions to splits



    if (partitions.isEmpty()) {

      try (Consumer<?, ?> consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {

        for (String topic : spec.getTopics()) {

          for (PartitionInfo p : consumer.partitionsFor(topic)) {

            partitions.add(new TopicPartition(p.topic(), p.partition()));

          }

        }

      }

    }



    partitions.sort(

        Comparator.comparing(TopicPartition::topic)

            .thenComparing(Comparator.comparingInt(TopicPartition::partition)));



    checkArgument(desiredNumSplits > 0);

    checkState(

        partitions.size() > 0,

        "Could not find any partitions. Please check Kafka configuration and topic names");



    int numSplits = Math.min(desiredNumSplits, partitions.size());

    List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);



    for (int i = 0; i < numSplits; i++) {

      assignments.add(new ArrayList<>());

    }

    for (int i = 0; i < partitions.size(); i++) {

      assignments.get(i % numSplits).add(partitions.get(i));

    }



    List<KafkaUnboundedSource<K, V>> result = new ArrayList<>(numSplits);



    for (int i = 0; i < numSplits; i++) {

      List<TopicPartition> assignedToSplit = assignments.get(i);



      LOG.info(

          "Partitions assigned to split {} (total {}): {}",

          i,

          assignedToSplit.size(),

          Joiner.on(",").join(assignedToSplit));



      result.add(

          new KafkaUnboundedSource<>(

              spec.toBuilder()

                  .setTopics(Collections.emptyList())

                  .setTopicPartitions(assignedToSplit)

                  .build(),

              i));

    }



    return result;

  }



  @Override

  public KafkaUnboundedReader<K, V> createReader(

      PipelineOptions options, KafkaCheckpointMark checkpointMark) {

    if (spec.getTopicPartitions().isEmpty()) {

      LOG.warn("Looks like generateSplits() is not called. Generate single split.");

      try {

        return new KafkaUnboundedReader<>(split(1, options).get(0), checkpointMark);

      } catch (Exception e) {

        throw new RuntimeException(e);

      }

    }

    return new KafkaUnboundedReader<>(this, checkpointMark);

  }



  @Override

  public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {

    return AvroCoder.of(KafkaCheckpointMark.class);

  }



  @Override

  public boolean requiresDeduping() {

    // Kafka records are ordered with in partitions. In addition checkpoint guarantees

    // records are not consumed twice.

    return false;

  }



  @Override

  public Coder<KafkaRecord<K, V>> getOutputCoder() {

    return KafkaRecordCoder.of(spec.getKeyCoder(), spec.getValueCoder());

  }



  /////////////////////////////////////////////////////////////////////////////////////////////



  private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedSource.class);



  private final KafkaIO.Read<K, V> spec; // Contains all the relevant configuratiton of the source.

  private final int id; // split id, mainly for debugging



  public KafkaUnboundedSource(KafkaIO.Read<K, V> spec, int id) {

    this.spec = spec;

    this.id = id;

  }



  KafkaIO.Read<K, V> getSpec() {

    return spec;

  }



  int getId() {

    return id;

  }

}